package com.gxlevi.service

import java.util
import java.util.UUID

import com.gxlevi.bean.{PageViewsBeanCase, WebLogBean}
import com.gxlevi.util.DateUtil
import org.apache.spark.RangePartitioner
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object PageViewService {

  def savePageViewToHdfs(filterStaticWebLogRdd: RDD[WebLogBean]) = {
    val uidTimeRdd: RDD[(String, WebLogBean)] = filterStaticWebLogRdd.map(bean => (bean.guid + "&" + bean.time_local, bean))
    val sortedUidTimeRdd: RDD[(String, WebLogBean)] = uidTimeRdd.sortByKey()
    val rangeRdd: RDD[(String, WebLogBean)] = sortedUidTimeRdd.partitionBy(new RangePartitioner(100, sortedUidTimeRdd))

    val spark: SparkSession = SparkSession.getDefaultSession.get
    val headTailList: CollectionAccumulator[(String, String)] = spark.sparkContext.collectionAccumulator[(String, String)]("headTailList")

    val questionSessionRdd: RDD[(WebLogBean, String, Int, Long)] = generateSessionId(rangeRdd, headTailList)
    questionSessionRdd.cache()
    questionSessionRdd.count()
    val headTailListValue: util.List[(String, String)] = headTailList.value

    //保存数据
    //questionSessionRdd.saveAsTextFile("/questionSessionRdd")

    import collection.JavaConverters._
    val buffer: mutable.Buffer[(String, String)] = headTailListValue.asScala
    val map: mutable.HashMap[String, String] = mutable.HashMap(buffer.toMap.toSeq: _*)

    val correctMap: mutable.HashMap[String, String] = processBoundaryMap(map)

    val questionBroadCast: Broadcast[mutable.HashMap[String, String]] = spark.sparkContext.broadcast(correctMap)

    val correctRdd: RDD[(WebLogBean, String, Int, Long)] = repairBoundarySession(questionSessionRdd, questionBroadCast)

    val pageViewRdd: RDD[PageViewsBeanCase] = correctRdd.map {
      t => {
        PageViewsBeanCase(
          t._2,
          t._1.remote_addr,
          t._1.time_local,
          t._1.request,
          t._3,
          t._4,
          t._1.http_referer,
          t._1.http_user_agent,
          t._1.body_bytes_sent,
          t._1.status,
          t._1.guid
        )
      }
    }
    pageViewRdd.saveAsTextFile("/pageViewRddTxt")
  }

  def generateSessionId(rangeRdd: RDD[(String, WebLogBean)], headTailList: CollectionAccumulator[(String, String)]) = {
    val sessionIdStepPageRdd: RDD[(WebLogBean, String, Int, Long)] = rangeRdd.mapPartitionsWithIndex {
      (index, iter) => {
        val list: List[(String, WebLogBean)] = iter.toList
        val resultTupleList: ListBuffer[(WebLogBean, String, Int, Long)] = new ListBuffer[(WebLogBean, String, Int, Long)]
        var sessionid = UUID.randomUUID().toString
        var step = 1
        var pagestaylong: Long = 60000
        import scala.util.control.Breaks._
        breakable {
          for (num <- 0 until list.size) {
            val currentTuple: (String, WebLogBean) = list(num)

            if (num == 0) {
              headTailList.add(index + "&first", currentTuple._1 + "&" + sessionid)
            }

            if (list.size == 1) {
              resultTupleList += ((currentTuple._2, sessionid, step, pagestaylong))
              sessionid = UUID.randomUUID().toString
              break()
            }

            breakable {
              if (num == 0) {
                break()
              }
              val lastTuple: (String, WebLogBean) = list(num - 1)
              val currentUidTime: String = currentTuple._1
              val lastUidTime: String = lastTuple._1

              val currentUidTimeArr: Array[String] = currentUidTime.split("&")
              val lastUidTimeArr = lastUidTime.split("&")

              val timeDiff = DateUtil.getTimeDiff(lastUidTimeArr(1), currentUidTimeArr(1))
              if (lastUidTimeArr(0).equals(currentUidTimeArr(0)) && timeDiff < 30 * 60 * 1000) {
                resultTupleList += ((lastTuple._2, sessionid, step, timeDiff))
                step += 1
              } else {
                resultTupleList += ((lastTuple._2, sessionid, step, pagestaylong))
                sessionid = UUID.randomUUID().toString
                step = 1
              }

              if (num == list.size - 1) {
                resultTupleList += ((currentTuple._2, sessionid, step, pagestaylong))
                headTailList.add(index + "&last", currentTuple._1 + "&" + sessionid + "&" + step + "&" + list.size)
                sessionid = UUID.randomUUID().toString
              }
            }
          }
        }
        resultTupleList.toIterator
      }
    }
    sessionIdStepPageRdd
  }

  def processBoundaryMap(map: mutable.HashMap[String, String]) = {
    val correctMap: mutable.HashMap[String, String] = new mutable.HashMap[String, String]()

    for (num <- 1 until (map.size / 2)) {
      val numFirstMsg: String = map.get(num + "&first").get
      val numLastMsg: String = map.get(num + "&last").get

      val lastPartLastMsg: String = map.get((num - 1) + "&last").get

      val numLastArr: Array[String] = numLastMsg.split("&")

      val lastPartLastArr: Array[String] = lastPartLastMsg.split("&")
      val numFirstArr: Array[String] = numFirstMsg.split("&")

      if (lastPartLastArr(0).equals(numFirstArr(0))) {
        val timediff = DateUtil.getTimeDiff(lastPartLastArr(1), numFirstArr(1))
        if (timediff < 30 * 60 * 1000) {
          correctMap.put((num - 1) + "&last", timediff.toString)

          if (lastPartLastArr.size > 5) {
            correctMap.put(num + "&first", lastPartLastArr(lastPartLastArr.size - 2) + "&" + lastPartLastArr(lastPartLastArr.size - 1) + "&" + numFirstArr(2))
          } else {
            correctMap.put(num + "&first", lastPartLastArr(2) + "&" + lastPartLastArr(3) + "&" + numFirstArr(2))
          }

          if (numFirstArr(2).equals(numLastArr(2))) {
            if (lastPartLastArr.size > 5) {
              map.put(num + "&last", numLastMsg + "&" + lastPartLastArr(lastPartLastArr.size - 2) + "&" +
                (lastPartLastArr(lastPartLastArr.size - 1).toInt + numLastArr(4).toInt))

            } else {
              map.put(num + "&last", numLastMsg + "&" + lastPartLastArr(2) + "&" + (lastPartLastArr(3).toInt + numLastArr(4).toInt))
            }
          }
        }
      }
    }
    correctMap
  }

  def repairBoundarySession(questionSessionRdd: RDD[(WebLogBean, String, Int, Long)], questionBroadCast: Broadcast[mutable.HashMap[String, String]]) = {
    val questionMap: mutable.HashMap[String, String] = questionBroadCast.value
    val correctRdd: RDD[(WebLogBean, String, Int, Long)] = questionSessionRdd.mapPartitionsWithIndex {
      (index, iter) => {
        var originList = iter.toList
        val firstLine = questionMap.getOrElse(index + "&first", "")
        val lastLine = questionMap.getOrElse(index + "&last", "")
        if (lastLine != "") {
          val buffer: mutable.Buffer[(WebLogBean, String, Int, Long)] = originList.toBuffer
          val lastTuple: (WebLogBean, String, Int, Long) = buffer.remove(buffer.size - 1)
          buffer += ((lastTuple._1, lastTuple._2, lastTuple._3, lastLine.toLong))
          originList = buffer.toList
        }

        if (firstLine != "") {
          val firstArr: Array[String] = firstLine.split("&")
          val tuples: List[(WebLogBean, String, Int, Long)] = originList.map {
            t => {
              if (t._2.equals(firstArr(2))) {
                (t._1, firstArr(0), firstArr(1).toInt + t._3.toInt, t._4)
              } else {
                t
              }
            }
          }
          originList = tuples
        }
        originList.iterator
      }
    }
    correctRdd
  }


}
